package com.fwmagic.dynamic_rule.engine;

import com.fwmagic.dynamic_rule.bean.LogBean;
import com.fwmagic.dynamic_rule.bean.ResultBean;
import com.fwmagic.dynamic_rule.functions.*;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

/**
 * 静态规则引擎版本3
 * 近期数据查State,远期数据查ClickHouse,跨时间点的条件分段查询
 */
public class RuleEngineV3 {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(new Configuration());

        //添加kafka数据源
        DataStreamSource<String> dataSource = env.addSource(SourceFunctions.createKafkaSource("yinew_applog"));

        //json数据转成LogBean
        SingleOutputStreamOperator<LogBean> maped = dataSource.map(new Json2BeanMapFunction());

        //把deviceId作为Key进行分组
        KeyedStream<LogBean, String> keyed = maped.keyBy(new DeviceIdKeySelector());

        //规则引擎的核心匹配逻辑
        SingleOutputStreamOperator<ResultBean> resultStream = keyed.process(new RuleProcessFunctionV3());

        //匹配到的结果打印
        resultStream.print();

        //启动Flink任务
        env.execute("RuleEngineV3");
    }
}
